import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;

public class ContinueTriggerDemo
{

    public static void main(String[] args) throws Exception
    {
        // TODO Auto-generated method stub

        String hostName = "localhost";
        Integer port = Integer.parseInt("8001");
        ;

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        // 从指定socket获取输入数据
        DataStream<String> text = env.socketTextStream(hostName, port);

        text.flatMap(new LineSplitter()) //数据语句分词
                .keyBy(0) // 流按照单词分区
                .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 设置一个120s的滚动窗口
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每统计一次当前计算结果
                .sum(1)// count求和
                .map(new Mapdemo())//输出结果加上时间戳
                .print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    }